iT邦幫忙

2023 iThome 鐵人賽

DAY 22
0

在對於外部的連結之後,我們再細部介紹一下 Operator 的使用,其實在 Operator 層級也可以有不同的使用方法。Operator 本身有多種種類,其作用也有所不同。到目前為止我們使用的 PythonOperator 只是其中的一種 Operator。那我們今天就好好來了解一下 Operator。

Operator

什麼是 Operator

那我們首先再了解一下我們一直使用的 Operator 是什麼。Apache Airflow 中的 Operator 是用來定義工作流程中 Task 的元件。每個 Operator 定義了一個獨立的工作或操作,可以執行所指定的任務,像是執行 Python 函數、執行 Bash 命令、執行 SQL 查詢、觸發外部系統的操作等等。

Operator種類

這邊列出一些 Airflow Operator 類型和它們的用途:

  • PythonOperator: 在 first_dag 中我們皆使用 PythonOperator,PythonOperator 主要用於執行自定義的 Python 函數,並將其作為任務的一部分執行。
  • BashOperator: 用於執行 Bash 命令,通常用於執行 shell 腳本或命令行工具。
    作法範例:
from airflow.operators.bash_operator import BashOperator

bash_task = BashOperator(
    task_id="bash",
    bash_command="echo 123",
)
  • MySqlOperator: 用於執行 SQL 查詢,通常用於從數據庫中擷取或更新、刪除數據等等。
    作法範例:
from airflow.providers.mysql.operators.mysql import MySqlOperator

# 執行 SELECT 操作:
select_task = MySqlOperator(
    task_id='mysql_select_task',
    mysql_conn_id='mysql_conn',  # 使用上一篇設定之 MySql connection
    sql="SELECT * FROM my_table WHERE column1 = 'value'",
    dag=dag,
)

# 執行 DELETE 操作:
delete_task = MySqlOperator(
    task_id='mysql_delete_task',
    mysql_conn_id='mysql_conn',
    sql="DELETE FROM my_table WHERE column1 = 'value'",
    dag=dag,
)
  • DummyOperator: 這種 Operator 不會做實際上的事情,通常用於在 workflow 中作為流程管理用。

  • DockerOperator: 用於啟動和管理 Docker 容器,可用於容器化應用程序的執行。

想要了解更多 Operator 種類可以參考官方文件:
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html

看到這邊不知道大家會不會覺得有些疑惑,那 Hook 和 Operator 的差別是什麼呢?實際在開發 DAG 中使用兩種進行開發都可以。比如說我們想要執行 MySQL 的指令,我們可以使用 MySqlOperator 也可以使用 PythonOperator 並在裡面使用 Hook。有不同的選擇可以讓我們在編寫 DAG 時有更大的靈活性。

客製 Operator

如果現有的 Operator 沒有滿足我們的需求,我們也可以自行客製我們的 Operator。要客製 Operator 的話我們可以以從 BaseOperator 類別繼承。這邊提供一個簡單範例給大家。

from airflow.models.baseoperator import BaseOperator

class MyCustomOperator(BaseOperator):
    def __init__(self, my_param: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.my_param = my_param

    def execute(self, context):
        message = f"My param: {self.my_param}"
        print(message)
        return message

要使用所建立的 MyCustomOperator 可以參考下面的程式

# 使用自訂 Operator
custom_task = MyCustomOperator(
    task_id='my_custom_task',
    my_param='Airflow',
    dag=dag,
)

了解了如何客製化的建立 Operator ,開發者便可以隨所需的需求,把經常會用到的工作建立成一個 Operator ,如此可以讓開發更有效率,也易於管理程式碼。

雖然之前常常使用到 Operator 但是或許對他不是那的熟悉,今天介紹完之後,大家不只可以使用 PythonOperator ,也可以針對任務情景使用不同的 Operator,甚至也可以自行定義 Operator。希望本篇有幫助到大家更能掌握 Operator。


上一篇
『Day21』使用 Conn、Hook 進行外部連接
下一篇
『Day23』來做個天氣資訊 DAG 吧 (上)
系列文
Data pipeline 建起來!用 Airflow 開發你的 Data pipeline30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言